package cloud.xiguapi.ubas.analysis.hotitems.model;

import cloud.xiguapi.ubas.model.HotItemsViewCount;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @author 大大大西西瓜皮🍉
 * date: 2021-5-18 下午 05:12
 * desc:
 */
public class KafkaWindowHotItemsCountResult implements WindowFunction<Long, HotItemsViewCount, Tuple, TimeWindow> {

    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Long> input, Collector<HotItemsViewCount> out) {
        // 获取ItemViewCount的字段
        Long itemId = tuple.getField(0);
        Long windowEnd = window.getEnd();
        Long count = input.iterator().next();

        out.collect(HotItemsViewCount.builder()
                .itemId(itemId)
                .windowEnd(windowEnd)
                .count(count)
                .build());
    }
}